在这个例子中,trigger_events方法决定了要发布的事实 。在这种情况下,它收集了周围的上下文 。这也可能包括请求参数(如传递到事件中的params属性) 。然而,什么是正确捕获的上下文也取决于上下文:) 。
因此,我们的最终事件可能看起来像这样 。请注意,该事件没有一个 updated_at 属性,因为我们认为事件是不可改变的事实 。我们不能撤消已经发生的事情 。
{"event_name": "PostCreatedEvent","event_id": "0fb6a4d4-ae65-4f18-be44-edb9ace6b5bb","event_version": "v1.0","time": "2022-09-03T04:16:59.294509+00:00","payload": {"user": { "user_id":"1a1269ee-6b6f-4325-8562-cb169a68e7b3", "is_blocked": false, "first_name": "Siddharth", "last_name": "R", "email": "sid@........},"post" :{ "post_id": "fa3e7b12-4908-4d53-be11-629e6f47ae90", "thread_id": "666d0404-0756-4b93-892f-19d9b4b25a99", ...... },"thread": { "thread_id": "666d0404-0756-4b93-892f-19d9b4b25a99", .....},"params": { .... },"errors": [ ... ],"error_code": ""},"created_at": "2022-09-03T04:16:59.294""logged_at": "2022-09-03T04:16:59.294"} 在我们的例子中,应用服务通过调用触发方法将事件转给一个叫做SystemEventsService的服务 。该方法在为我们实际发布该事件之前做了一系列的工作 。它通过我们先前看到的连续体运行,根据我们传递给它的分区键识别队列(和相应的工作者worker) 。这几乎就是我们需要一致的散列的原因 。这可以确保我们的事件总是由同一个分区(和工作者)处理 。
因此,一旦我们为我们的任务确定了工作者,我们就要求工作者
- 保留该事件,以备我们以后需要再来处理它
- 将其发布给所有相关的工作者
- 让我们订阅该任务的事件驱动型工作负载触发其工作流程 。
将事件分配到正确的分区
【使用Redis实现简单的事件驱动架构 「DDD、事件溯源和一致性哈希」】
@staticmethodasync def trigger(partition_key: str, event_dao: SystemEventDAO):try:worker: SystemEventPartitionConfig = await SystemEventsService._get_worker(partition_key=partition_key)worker_func = getattr(system_events_workers, worker.worker_name)log_info(msg=f"Trigger called with worker: {worker.worker_name}")worker_func.delay(event_dao=event_dao.json())except (OperationalError, ConnectionError) as e:log_error(msg=f"[redisError] {e}", e=e, method="trigger", loc=f"{__name__}")except Exception as e:log_error(msg=f"SystemEventError: {e}", e=e, method="trigger", loc=f"{__name__}")return@staticmethodasync def _get_worker(partition_key: str) -> SystemEventPartitionConfig:"""For a given string it returns the worker that should process the event by running it through a murmurr hashingfunction and uses that to fetch the nodes from the continuum"""node = ring.get(key=partition_key)nodename = node.get("nodename", None)if not nodename:raise ValueError("Could not find a node in the continuum for key {node}")node_config = continuum.get(nodename, None)if not node_config:raise ValueError("Could not find a node in the continuum for key {nodename}")config_attrs = {"partition_key": partition_key, "partition_id": nodename}config_attrs = {**node_config, **config_attrs}return SystemEventPartitionConfig(**config_attrs) 事件驱动的系统
下面是最好的部分:
现在整个系统可以让你把你的应用程序作为一系列的异步事件处理程序来运行,这些处理程序可以在特定的事件上被调用 。当一个事件到达正确的分区时,工作者会将该事件分配给一系列的事件处理程序 。
async def create_system_event(task_type, event_dao: SystemEventDAO, db_session: Session = None):if not db_session:db_session = get_session()system_event: Union[SystemEvent, None] = Nonetry:if event_dao.event_name not in SYSTEM_GENERATED_REQUEST_EVENTS:system_event = await SystemEventsService.create(event_dao=event_dao, db_session=db_session)if system_event:log_info(msg=f"system_event with id: {system_event.id} created for event_name: {system_event.event_name}")event_dao.id = system_event.idelse:log_info(msg=f"system_generated_request_event with name: {event_dao.event_name} ready for processing.")await EventHandler.process(event_dao=event_dao, db_session=db_session)except Exception as e:log_error(msg=f"Error handling events: {event_dao.event_name}: {e} \n {traceback.print_exc()}")capture_exc(error=e)finally:db_session.close()return system_event
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- ajax请求的五个步骤代码!ajax如何使用?
- 餐饮收银员的基本电脑常识--超市收银员的具体步骤,电脑该怎样使用?
- 数字乡村综合服务平台可实现?乡村旅游数字化发展
- 使用 python 绘制万花尺
- 看似简单但使用率非常高的7个Excel技巧,都在此文,速度围观!
- 蜜罐技术是一种什么防御技术?实现原理是什么?
- CentOS 7 Freeswitch1.10.7 对接百度MrcpServer实现 TTS和ASR
- 使用 Apache Kafka 构建您自己的社交媒体分析
- 京东科技Redis跨数据中心双向同步优化实践
- 饵料|用蚯蚓钓鱼的技巧和弊端,只有使用得当,才能发挥蚯蚓的作用
