go实现Elasticsearches修改查询发送MQ
Jeff的技术栈 人气:0update_by_query批量修改
POST post-v1_1-2021.02,post-v1_1-2021.03,post-v1_1-2021.04/_update_by_query { "query": { "bool": { "must": [ { "term": { "join_field": { "value": "post" } } }, { "term": { "platform": { "value": "toutiao" } } }, { "exists": { "field": "liked_count" } } ] } }, "script":{ "source":"ctx._source.liked_count=0", "lang":"painless" } }
索引添加字段
PUT user_tiktok/_doc/_mapping?include_type_name=true { "post_signature":{ "StuClass":{ "type":"keyword" }, "post_token":{ "type":"keyword" } } } PUT user_toutiao/_mapping { "properties": { "user_token": { "type": "text" } } }
查询es发送MQ
from celery import Celery from elasticsearch import Elasticsearch import logging import arrow import pytz from elasticsearch.helpers import scan, streaming_bulk import redis pool_16_8 = redis.ConnectionPool(host='10.0.3.100', port=6379, db=8, password='EfcHGSzKqg6cfzWq') rds_16_8 = redis.StrictRedis(connection_pool=pool_16_8) logger = logging.getLogger('elasticsearch') logger.disabled = False logger.setLevel(logging.INFO) es_zoo_connection = Elasticsearch('http://eswriter:e s密码@e sip:4000', dead_timeout=10, retry_on_timeout=True) logger = logging.getLogger(__name__) class ES(object): index = None doc_type = None id_field = '_id' version = '' source_id_field = '' aliase_field = '' separator = '-' aliase_func = None es = None tz = pytz.timezone('Asia/Shanghai') logger = logger @classmethod def mget(cls, ids=None, index=None, **kwargs): index = index or cls.index docs = cls.es.mget(body={'ids': ids}, doc_type=cls.doc_type, index=index, **kwargs) return docs @classmethod def count(cls, query=None, index=None, **kwargs): index = index or cls.index c = cls.es.count(doc_type=cls.doc_type, body=query, index=index, **kwargs) return c.get('count', 0) @classmethod def upsert(cls, doc, doc_id=None, index=None, doc_as_upsert=True, **kwargs): body = { "doc": doc, } if doc_as_upsert: body['doc_as_upsert'] = True id = doc_id or cls.id_name(doc) index = index or cls.index_name(doc) cls.es.update(index, id, cls.doc_type, body, **kwargs) @classmethod def search(cls, index=None, query=None, **kwargs): index = index or cls.index return cls.es.search(index=index, body=query, **kwargs) @classmethod def scan(cls, query, index=None, **kwargs): return scan(cls.es, query=query, index=index or cls.index, **kwargs) @classmethod def index_name(cls, doc): if cls.aliase_field and cls.aliase_field in doc.keys(): aliase_part = doc[cls.aliase_field] if isinstance(aliase_part, str): aliase_part = arrow.get(aliase_part) if isinstance(aliase_part, int): aliase_part = arrow.get(aliase_part).astimezone(cls.tz) if cls.version: index = '{}{}{}{}{}'.format(cls.index, cls.separator, cls.version, cls.separator, cls.aliase_func(aliase_part)) else: index = '{}{}{}'.format(cls.index, cls.separator, cls.aliase_func(aliase_part)) else: index = cls.index return index @classmethod def id_name(cls, doc): id = doc.get(cls.id_field) and doc.pop(cls.id_field) or doc.get(cls.source_id_field) if not id: print('========', doc) assert id, 'doc _id must not be None' return id @classmethod def bulk_upsert(cls, docs, **kwargs): """ 批量操作文章, 仅支持 index 和 update """ op_type = kwargs.get('op_type') or 'update' chunk_size = kwargs.get('chunk_size') if op_type == 'update': upsert = kwargs.get('upsert', True) if upsert is None: upsert = True else: upsert = False actions = cls._gen_bulk_actions(docs, cls.index_name, cls.doc_type, cls.id_name, op_type, upsert=upsert) result = streaming_bulk(cls.es, actions, chunk_size=chunk_size, raise_on_error=False, raise_on_exception=False, max_retries=5, request_timeout=25) return result @classmethod def _gen_bulk_actions(cls, docs, index_name, doc_type, id_name, op_type, upsert=True, **kwargs): assert not upsert or (upsert and op_type == 'update'), 'upsert should use "update" as op_type' for doc in docs: # 支持 index_name 作为一个工厂函数 if callable(index_name): index = index_name(doc) else: index = index_name if op_type == 'index': _source = doc elif op_type == 'update' and not upsert: _source = {'doc': doc} elif op_type == 'update' and upsert: _source = {'doc': doc, 'doc_as_upsert': True} else: continue if callable(id_name): id = id_name(doc) else: id = id_name # 生成 Bulk 动作 action = { "_op_type": op_type, "_index": index, "_type": doc_type, "_id": id, "_source": _source } yield action class tiktokEsUser(ES): index = 'user_tiktok' doc_type = '_doc' id_field = '_id' source_id_field = 'user_id' es = es_zoo_connection from kombu import Exchange, Queue, binding def data_es_route_task_spider(name, args, kwargs, options, task=None, **kw): return { 'exchange': 'tiktok', 'exchange_type': 'topic', 'routing_key': name } class DataEsConfig_download(object): broker_url = 'amqp://用户:密码@ip:端口/' task_ignore_result = True task_serializer = 'json' accept_content = ['json'] task_default_queue = 'default' task_default_exchange = 'default' task_default_routing_key = 'default' exchange = Exchange('tiktok', type='topic') task_queues = [ Queue( 'tiktok.user_avatar.download', [binding(exchange, routing_key='tiktok.user_avatar.download')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.post_avatar.download', [binding(exchange, routing_key='tiktok.post_avatar.download')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.post.spider', [binding(exchange, routing_key='tiktok.post.spider')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.post.save', [binding(exchange, routing_key='tiktok.post.save')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.user.save', [binding(exchange, routing_key='tiktok.user.save')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.post_avatar.invalid', [binding(exchange, routing_key='tiktok.post_avatar.invalid')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.user_avatar.invalid', [binding(exchange, routing_key='tiktok.user_avatar.invalid')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.comment.save', [binding(exchange, routing_key='tiktok.comment.save')], queue_arguments={'x-queue-mode': 'lazy'} ), ] task_routes = (data_es_route_task_spider,) enable_utc = True timezone = "Asia/Shanghai" # 下载app tiktok_app = Celery( 'tiktok', include=[ 'task.tasks', ] ) tiktok_app.config_from_object(DataEsConfig_download) # 发任务生产者,更新舆情user历史信息 def send_post(): query = { "query": { "bool": { "must": [ { "exists": { "field": "post_signature" } }, { "range": { "following_num": { "gte": 1000 } } } ] } }, "_source": ["region", "sec_uid", "post_signature"] } # query = { # "query": { # "bool": { # "must": [ # {"exists": { # "field": "post_signature" # }}, # { # "match": { # "region": "MY" # } # } # ] # } # }, # "_source": ["region", "sec_uid", "post_signature"] # } r = tiktokEsUser.scan(query=query, scroll='30m', request_timeout=100) for item in map(lambda x: x['_source'], r): tiktok_app.send_task('tiktok.post.spider', args=(item,)) def send_sign_token(): query = { "query": { "bool": { "must": [ { "exists": { "field": "post_signature" } }, { "range": { "following_num": { "gte": 1000 } } }, { "range": { "create_time": { "gte": "2021-01-06T00:00:00", "lte": "2021-01-06T01:00:00" } } } ] } }, "_source": ["user_id", "sec_uid"] } r = tiktokEsUser.scan(query=query, scroll='30m', request_timeout=100) for item in map(lambda x: x['_source'], r): tiktok_app.send_task('tiktok.user.sign_token', args=(item,)) if __name__ == '__main__': send_post() # send_sign_token()
加载全部内容