lmdb changes

This commit is contained in:
Yaroslav Polyakov 2025-05-05 17:38:27 +07:00
parent 44477b69d9
commit 53499c4c96
11 changed files with 266 additions and 46 deletions

View file

@ -9,8 +9,17 @@ This project is absolutely unofficial, not related to 2GIS.ru.
pipx install git+https://github.com/yaroslaff/antifraud2gis
~~~
## Basic operations
## Fraud detection
- `af2gis` - main program for most user operations
- `af2dev` - program for developers
- `af2web` - webserver (probably you do not need it)
- `af2worker` - background worker (probably you do not need it too)
If you want just to run fraud detection on companies by 2GIS object-id, you need to use only `af2gis`.
### Fraud detection
~~~
# by OID:
af2gis fraud 141265769338187
@ -22,6 +31,76 @@ af2gis fraud nskzoo
af2gis aliases
~~~
### Explore town (crawl, index new companies)
You need to have at least few users already crawled (from previous fraud detections).
Explore will submit jobs to redis queue, and you need to have af2worker running to process queue.
~~~
af2dev explore -t Новосибирск
~~~
`-t` will limit only to specific town.
"Explore" may look slow or even hangs, but you may check summary or queue status in other tab every 3-5 minutes to see progress. (It really hangs only if af2worker is not running)
### local db summary
~~~
$ af2gis summary
2025-05-04 16:15:35 SUMMARY Companies: total=3, nerr=0 ncalc=3 nncalc=0
2025-05-04 16:15:35 SUMMARY LMDB prefixes: {'object': 12053, 'user': 900}
~~~
Companies (on first line) are companies which are downloaded, objects (on second line) - companies which are known (but not downloaded).
### Queue status
Use `af2dev queue` (or just `q`).
~~~
$ af2dev q
Queue report
Worker status: started...
Dramatiq queue: 0
Tasks (0): []
Trusted (5/20):
141265771582384 СберБанк, Банки (3.6) None
70000001025692296 Нск Шашлык, Быстрое питание (4.8) None
70000001026944015 Дивногорский, управляющая компания (3) None
70000001023639031 Сибкарт моторспорт, картинг-центр (4.7) None
141265771632879 Подорожник, доступная кофейня (2.2) None
Untrusted (5/11)
70000001091636066 Культура, рюмочная (5) ['risk_users 84% (222 / 262)']
70000001091636066 Культура, рюмочная (5) ['risk_users 84% (222 / 262)']
70000001083008185 Рыдзинский, рюмочная-караоке (4.9) ['risk_users 41% (179 / 432)']
5348552838629866 Юсуповский Дворец на Мойке, музей (4.9) ['sametitle_rel 20% (5 of 1)']
5348552838629866 Юсуповский Дворец на Мойке, музей (4.9) ['sametitle_rel 20% (5 of 1)']
~~~
## Search by substring
SQLite database has data about all companies after fraud-detect.
~~~
$ af2gis search арн -t Новосибирск
{'oid': '70000001075178717', 'title': 'Пекарня, Пекарни', 'address': 'Новосибирск, Урожайная, 8', 'town': 'Новосибирск', 'searchstr': 'новосибирск пекарня, пекарни', 'rating_2gis': 4.7, 'trusted': 1, 'nreviews': 31, 'detections': ''}
{'oid': '70000001041433989', 'title': 'Пекарня, Пекарни', 'address': 'Новосибирск, Чигорина, 3', 'town': 'Новосибирск', 'searchstr': 'новосибирск пекарня, пекарни', 'rating_2gis': 4.6, 'trusted': 1, 'nreviews': 52, 'detections': ''}
~~~
LMDB database has data about all ever seen companies (even if company was once seen in one user reviews)
~~~
mir ~/repo/antifraud2gis $ af2dev lmdb дог
object:70000001007492207
{
"name": "Хот-дог райский, киоск фастфудной продукции",
"address": "Новосибирск, проспект Дзержинского, 23"
}
object:70000001017423547
{
"name": "Хот-дог Мастер, фуд-киоск",
"address": "Новосибирск, проспект Карла Маркса, 4а"
}
...
~~~
## Export and analyse data with JQ
~~~shell

View file

@ -1,6 +1,9 @@
import dramatiq
import sys
from ..tasks import fraud_task
import redis
import time
from ..const import REDIS_WORKER_STARTED
from ..tasks import fraud_task, set_status
from dramatiq.cli import main as dramatiq_main
from ..logger import logger, loginit
@ -8,10 +11,23 @@ from ..logger import logger, loginit
af2worker antifraud2gis.tasks
"""
def main():
r = redis.Redis(
decode_responses=True
)
sys.argv = ["dramatiq", "-p1", "-t1", "antifraud2gis.tasks"]
loglevel = "DEBUG" # (or "INFO")
loginit(loglevel)
logger.debug(f"Starting dramatiq worker ({loglevel})")
set_status("worker started")
r.set(REDIS_WORKER_STARTED, time.time())
dramatiq_main()
if __name__ == "__main__":

View file

@ -26,8 +26,9 @@ from ..exceptions import AFNoCompany, AFNoTitle, AFCompanyError
from ..aliases import aliases
from .summary import printsummary
from ..tasks import submit_fraud_task, cooldown_queue
from ..const import REDIS_TASK_QUEUE_NAME, REDIS_TRUSTED_LIST, REDIS_UNTRUSTED_LIST, REDIS_WORKER_STATUS, REDIS_DRAMATIQ_QUEUE, REVIEWS_KEY, \
LMDB_MAP_SIZE
from ..const import REDIS_TASK_QUEUE_NAME, REDIS_TRUSTED_LIST, REDIS_UNTRUSTED_LIST, REDIS_WORKER_STATUS, REDIS_WORKER_STATUS_SET, \
REDIS_DRAMATIQ_QUEUE, REVIEWS_KEY, \
LMDB_MAP_SIZE, REDIS_WORKER_STARTED
from ..logger import logger
from ..session import session
from ..utils import random_company
@ -271,25 +272,28 @@ def do_provider(args, cl: CompanyList):
def lmdb_dump(args):
try:
lmdb_path = args.args[0]
needle = args.args[0]
except IndexError:
lmdb_path = settings.lmdb_user_storage.as_posix()
needle = None
lmdb_path = settings.lmdb_storage.as_posix()
print("Dump", lmdb_path)
env = lmdb.open(lmdb_path, readonly=True)
env = lmdb.open(lmdb_path, readonly=True, map_size=LMDB_MAP_SIZE)
with env.begin() as txn:
with txn.cursor() as cur:
for key, val in cur:
if key.startswith(b'object:'):
data = json.loads(val.decode())
print(key.decode())
print_json(data=data)
if needle is None or needle.lower() in data['name']:
print(key.decode())
print_json(data=data)
elif key.startswith(b'user:'):
# set data based on gzipped json in val
data = json.loads(val.decode())
print(key.decode())
print_json(data=data)
if not needle:
data = json.loads(val.decode())
print(key.decode())
print_json(data=data)
else:
print(key.decode(), val[:100])
@ -307,7 +311,7 @@ def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("cmd", choices=['company-users', 'user-reviews', 'company-reviews', 'queue', 'explore', 'provider', 'sys', 'filldb', 'dev', 'lmdb', 'convert', 'delkeys'])
parser.add_argument("cmd", choices=['company-users', 'users', 'user-reviews', 'company-reviews', 'queue', 'explore', 'provider', 'sys', 'filldb', 'dev', 'lmdb', 'convert', 'delkeys'])
parser.add_argument("-v", "--verbose", default=False, action='store_true')
parser.add_argument("--full", default=False, action='store_true')
parser.add_argument("args", nargs='*', help='extra args')
@ -354,6 +358,10 @@ def main():
for r in c.reviews():
print(r)
elif cmd == "users":
for u in User.users():
print(u)
elif cmd == "queue":
r = redis.Redis(decode_responses=True)
@ -367,6 +375,19 @@ def main():
r.delete(key)
wstatus = r.get(REDIS_WORKER_STATUS)
wstatus_set = r.get(REDIS_WORKER_STATUS_SET)
if wstatus_set:
wstatus_age = int( time.time() - float(wstatus_set))
else:
wstatus_age = None
wstarted = r.get(REDIS_WORKER_STARTED)
if wstarted:
wuptime = int(time.time() - float(wstarted))
else:
wuptime = None
tasks = r.lrange(REDIS_TASK_QUEUE_NAME, 0, -1) # возвращает list of bytes
trusted_len = r.llen(REDIS_TRUSTED_LIST)
untrusted_len = r.llen(REDIS_UNTRUSTED_LIST)
@ -378,9 +399,12 @@ def main():
lastn = 5
print("Queue report")
print(f"Worker status: {wstatus}")
print(f"Worker status: {wstatus} ({wstatus_age} sec ago)")
print(f"Worker uptime: {wuptime} sec.")
print(f"Dramatiq queue: {dqlen}")
print(f"Tasks ({len(tasks)}): {tasks[:5]} ")
tasks_suffix = '...' if len(tasks) > lastn else ''
print(f"Tasks ({len(tasks)}): {' '.join(tasks[:lastn])} {tasks_suffix}")
print(f"Trusted ({lastn}/{trusted_len}):")
for c in last_trusted[:lastn]:
# print_json(data=c)
@ -500,6 +524,8 @@ def main():
logger.info("Stopfile found, exit")
stopfile.unlink()
return
else:
print(f"Finished. Last used: {idx} submitted: {submitted}")
elif cmd == "dev":
return
@ -548,7 +574,7 @@ def main():
elif cmd == "convert":
nusers = User.nusers()
env = lmdb.open(settings.lmdb_user_storage.as_posix(), map_size=LMDB_MAP_SIZE)
env = lmdb.open(settings.lmdb_storage.as_posix(), map_size=LMDB_MAP_SIZE)
with env.begin(write=True) as txn:
for idx, u in enumerate(User.users()):

View file

@ -141,7 +141,12 @@ def main():
print(f"{len(report['relations'])} relations")
elif args.cmd == "search":
res = dbsearch(args.args[0], detection=args.detection, addr=args.town, limit=500)
try:
needle = args.args[0]
except IndexError:
needle=''
res = dbsearch('', detection=args.detection, addr=args.town, limit=500)
for rec in res:
print(rec)
if args.fmt == 'full':
@ -184,7 +189,7 @@ def main():
effectively_processed = 0
for c in cl.companies(oid=args.company, name=args.name, town=args.town, detection=args.detection, report=args.report, noreport=args.noreport):
for c in cl.companies(oid=args.company, name=args.name, town=args.town, detection=args.detection, report=args.report, noreport=args.noreport, limit=args.limit):
total_processed += 1

View file

@ -33,8 +33,6 @@ def printsummary(cl: CompanyList, full=False):
global last_summary
logger.info(f"SUMMARY request")
userpath = settings.user_storage
total = len(list(cl.companies()))
nerr = 0
@ -51,7 +49,7 @@ def printsummary(cl: CompanyList, full=False):
logger.info(f"SUMMARY Companies: {total=}, {nerr=} {ncalc=} {nncalc=}")
env = lmdb.open(settings.lmdb_user_storage.as_posix(), readonly=True)
env = lmdb.open(settings.lmdb_storage.as_posix(), readonly=True, map_size=LMDB_MAP_SIZE)
prefixes = defaultdict(int)
with env.begin() as txn:
with txn.cursor() as cur:

View file

@ -469,7 +469,7 @@ class Company:
@staticmethod
def resolve_oid(object_id: str):
""" set self.title/address from user's reviews """
env = lmdb.open(settings.lmdb_user_storage.as_posix(), map_size=LMDB_MAP_SIZE)
env = lmdb.open(settings.lmdb_storage.as_posix(), map_size=LMDB_MAP_SIZE, readonly=True)
with env.begin() as txn:
jdata = txn.get(b"object:" + object_id.encode())
@ -533,8 +533,6 @@ class CompanyList():
yield c
return

View file

@ -17,6 +17,8 @@ REDIS_TASK_QUEUE_NAME="af2gis:queue"
REDIS_TRUSTED_LIST="af2gis:last_trusted_list"
REDIS_UNTRUSTED_LIST="af2gis:last_untrusted_list"
REDIS_WORKER_STATUS="af2gis:worker_status"
REDIS_WORKER_STATUS_SET="af2gis:worker_status_set"
REDIS_WORKER_STARTED="af2gis:worker_started"
REDIS_DRAMATIQ_QUEUE="dramatiq:default"
LMDB_MAP_SIZE = 20*1024**3
LMDB_MAP_SIZE = 1 << 30

View file

@ -65,7 +65,9 @@ class EmptyUserFD(BaseFD):
# self.score['non-empty-users'] = len(self.non_empty_ratings)
self.score['empty_user_ratio'] = empty_users_ratio
logger.debug(f"empty_user_ratio {empty_users_ratio}% >= {settings.empty_user}%")
sign_empty_users_ratio = '>=' if empty_users_ratio >= settings.empty_user else '<'
logger.debug(f"empty_user_ratio {empty_users_ratio}% {sign_empty_users_ratio} {settings.empty_user}%")
logger.debug(f"empty_rating({empty_users_r:.1f}) - non_empty_rating({non_empty_users_r:.1f}) = {(empty_users_r - non_empty_users_r):.1f} >= {settings.rating_diff}")
if empty_users_ratio >= settings.empty_user and (empty_users_r - non_empty_users_r ) >= settings.rating_diff:

View file

@ -1,13 +1,15 @@
from pathlib import Path
import os
import lmdb
from dotenv import load_dotenv
from .const import LMDB_MAP_SIZE
class Settings():
def __init__(self):
load_dotenv()
self.storage = Path("~/.af2gis-storage").expanduser()
self.user_storage = self.storage / "users"
self.lmdb_user_storage = self.storage / "users.lmdb"
self.lmdb_storage = self.storage / "users.lmdb"
self.private_user_storage = self.storage / "users" / "_private.json"
self.company_storage = self.storage / "companies"
@ -81,7 +83,14 @@ class Settings():
self.user_storage.mkdir(parents=True)
if not self.company_storage.exists():
self.company_storage.mkdir(parents=True)
if not self.lmdb_storage.exists():
env = lmdb.open(self.lmdb_storage.as_posix(), map_size=LMDB_MAP_SIZE)
with env.begin(write=True):
pass
env.close()
def param_fp(self):
return f"risk_hit={self.risk_hit_th} risk_median_th={self.risk_median_th} risk_highrate_th={self.risk_highrate_th} " \
f"empty_user={self.empty_user} " \

View file

@ -8,7 +8,8 @@ from .fraud import detect
from .company import CompanyList, Company
from .exceptions import AFNoCompany, AFReportAlreadyExists
from .logger import logger
from .const import REDIS_WORKER_STATUS, REDIS_TRUSTED_LIST, REDIS_UNTRUSTED_LIST, REDIS_TASK_QUEUE_NAME, REDIS_DRAMATIQ_QUEUE
from .const import REDIS_WORKER_STATUS, REDIS_WORKER_STATUS_SET, REDIS_TRUSTED_LIST, REDIS_UNTRUSTED_LIST, \
REDIS_TASK_QUEUE_NAME, REDIS_DRAMATIQ_QUEUE
from .user import reset_user_pool
from .statistics import statistics
@ -20,7 +21,7 @@ r = redis.Redis(
decode_responses=True
)
r.set(REDIS_WORKER_STATUS, f'started...')
# r.set(REDIS_WORKER_STATUS, f'worker started as pid {os.getpid()}')
started = time.time()
processed = 0
@ -29,9 +30,17 @@ def get_qsize():
return r.llen(REDIS_DRAMATIQ_QUEUE)
def cooldown_queue(maxq: int):
while get_qsize() > maxq:
printed = False
while get_qsize() >= maxq:
if not printed:
logger.debug(f"Queue size {get_qsize()} > {maxq}, waiting to cooldown...")
printed = True
time.sleep(10)
def set_status(status: str):
r.set(REDIS_WORKER_STATUS, status)
r.set(REDIS_WORKER_STATUS_SET, str(int(time.time())))
@dramatiq.actor
def fraud_task(oid: str, force=False):
global processed
@ -55,7 +64,7 @@ def fraud_task(oid: str, force=False):
return
print(f"{os.getpid()} task STARTED {c}")
r.set(REDIS_WORKER_STATUS, oid)
set_status(oid)
try:
score = detect(c, cl, force=force)
except AFNoCompany:
@ -65,7 +74,7 @@ def fraud_task(oid: str, force=False):
logger.warning(f"Worker: Report for {oid!r} already exists")
return
r.set(REDIS_WORKER_STATUS, f'finished {oid}')
set_status(f'finished {oid}')
print(f"{os.getpid()} task FINISHED {c}")
print("SCORE:", score)

View file

@ -11,6 +11,8 @@ import sys
import datetime
import gzip
import lmdb
import tempfile
import os
from .db import db
from .const import WSS_THRESHOLD, LOAD_NREVIEWS, SLEEPTIME, LMDB_MAP_SIZE
@ -53,14 +55,14 @@ class User:
objects = dict()
reviews = list()
env = lmdb.open(settings.lmdb_user_storage.as_posix(), map_size=LMDB_MAP_SIZE)
env = lmdb.open(settings.lmdb_storage.as_posix(), map_size=LMDB_MAP_SIZE)
with env.begin() as txn:
val = txn.get(b"user:" + self.public_id.encode())
if val:
self._reviews = json.loads(val)
return
# not found in db
if local_only is False:
loaded = False
@ -69,7 +71,7 @@ class User:
self.load_from_network()
loaded = True
except Exception as e:
print(f"Error loading user {self.public_id}: {e}")
print(f"Error loading user {self.public_id}: {type(e)} {e}")
time.sleep(5)
@ -105,10 +107,12 @@ class User:
""" """
def save_txn():
# logger.debug(f'lmdb save user {self.public_id}: {reviews}')
txn.put(b'user:' + self.public_id.encode(), json.dumps(reviews).encode())
for oid, odata in objects.items():
txn.put(b'object:' + oid.encode(), json.dumps(odata).encode())
# logger.debug(f'lmdb save object {oid}: {odata}')
txn.put(b'object:' + oid.encode(), json.dumps(odata).encode(), overwrite=False)
# prepare data structures
@ -136,7 +140,7 @@ class User:
if txn:
save_txn()
else:
env = lmdb.open(settings.lmdb_user_storage.as_posix(), map_size=LMDB_MAP_SIZE)
env = lmdb.open(settings.lmdb_storage.as_posix(), map_size=LMDB_MAP_SIZE)
with env.begin(write=True) as txn:
save_txn()
@ -150,8 +154,11 @@ class User:
if not self._reviews:
# private profile
return None
r = Review(sorted(self._reviews, key=lambda r: r['created'])[0])
try:
r = Review(sorted(self._reviews, key=lambda r: r['created'])[0])
except KeyError:
print_json(data=self._reviews)
raise
return r.created
def towns(self):
@ -196,7 +203,7 @@ class User:
# why we were called?
# print("".join(traceback.format_stack(limit=10)))
url = f'https://api.auth.2gis.com/public-profile/1.1/user/{self.public_id}/content/feed?page_size=20'
url = f'https://api.auth.2gis.com/public-profile/1.1/user/{self.public_id}/content/feed?page_size=20'
page = 0
@ -265,19 +272,88 @@ class User:
@property
def name(self):
if self._reviews:
return self._reviews[0]['user_name']
try:
return self._reviews[0]['user_name']
except KeyError:
return None
# return self._reviews[0]['user']['name']
else:
return None
@staticmethod
def users():
for file in settings.user_storage.glob('*-reviews.json.gz'):
yield User(file.stem.split('-')[0])
env = lmdb.open(settings.lmdb_storage.as_posix(), readonly=True, map_size=LMDB_MAP_SIZE, lock=False)
prefix = b'user:'
key = prefix # start with first 'user:'
while True:
with env.begin() as txn: # making lot of very SHORT transactions
with txn.cursor() as cur:
if not cur.set_range(key):
return # The End
for k, _ in cur:
if not k.startswith(prefix): # Wrong key. no more users:, the End
return
public_id = k.decode().split(':', 1)[1]
yield User(public_id)
key = k + b'\x00' # will use next key
break
@staticmethod
def old_users_file():
env = lmdb.open(settings.lmdb_storage.as_posix(), readonly=True, map_size=LMDB_MAP_SIZE, lock=False)
prefix = b'user:'
with tempfile.NamedTemporaryFile(mode='w+', prefix='af2gis-users-', suffix='.txt', delete=False) as f:
tmp_path = f.name
print(f"Userlist in {tmp_path}")
with env.begin() as txn:
with txn.cursor() as cur:
if cur.set_range(prefix):
for key, _ in cur:
if not key.startswith(prefix):
break
public_id = key.decode().split(':', 1)[1]
f.write(public_id + '\n')
try:
with open(tmp_path, 'r') as f:
for line in f:
yield User(line.strip())
finally:
os.remove(tmp_path)
@staticmethod
def old_users_iterator():
#for file in settings.user_storage.glob('*-reviews.json.gz'):
# yield User(file.stem.split('-')[0])
env = lmdb.open(settings.lmdb_storage.as_posix(), readonly=True, map_size=LMDB_MAP_SIZE)
with env.begin() as txn:
with txn.cursor() as cur:
if cur.set_range(b'user:'):
for key, value in cur:
if not key.startswith(b'user:'):
break
public_id = key.decode().split(':')[1]
yield User(public_id)
@staticmethod
def nusers():
return sum(1 for _ in settings.user_storage.glob('*-reviews.json.gz'))
n = 0
env = lmdb.open(settings.lmdb_storage.as_posix(), readonly=True, map_size=LMDB_MAP_SIZE)
with env.begin() as txn:
with txn.cursor() as cur:
if cur.set_range(b'user:'):
for key, value in cur:
if not key.startswith(b'user:'):
break
n+=1
return n
def __repr__(self):
return f'User({self.name} (rev: {len(self._reviews) if self._reviews else "not loaded"}) {self.url})'