zinoma/engine/target_actor/
target_actor_helper.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use super::{ActorId, ActorInputMessage, ExecutionKind, TargetActorOutputMessage};
use crate::domain::{TargetId, TargetMetadata};
use crate::engine::watcher::TargetInvalidatedMessage;
use crate::TerminationMessage;
use anyhow::Error;
use async_std::channel::{Receiver, Sender};
use std::collections::{HashMap, HashSet};

pub struct TargetActorHelper {
    pub target_id: TargetId,
    pub termination_events: Receiver<TerminationMessage>,
    pub target_invalidated_events: Receiver<TargetInvalidatedMessage>,
    pub target_actor_input_receiver: Receiver<ActorInputMessage>,
    pub target_actor_output_sender: Sender<TargetActorOutputMessage>,
    pub to_execute: bool,
    pub executed: bool,
    pub dependencies: Vec<TargetId>,
    pub unavailable_dependencies: HashMap<ExecutionKind, HashSet<TargetId>>,
    pub requesters: HashMap<ExecutionKind, HashSet<ActorId>>,
}

impl TargetActorHelper {
    pub fn new(
        target_metadata: &TargetMetadata,
        termination_events: Receiver<TerminationMessage>,
        target_invalidated_events: Receiver<TargetInvalidatedMessage>,
        target_actor_input_receiver: Receiver<ActorInputMessage>,
        target_actor_output_sender: Sender<TargetActorOutputMessage>,
    ) -> Self {
        let dependencies = target_metadata.dependencies.clone();

        let mut unavailable_dependencies = HashMap::new();
        let dependencies_set: HashSet<_> = dependencies.iter().cloned().collect();
        unavailable_dependencies.insert(ExecutionKind::Build, dependencies_set.clone());
        unavailable_dependencies.insert(ExecutionKind::Service, dependencies_set);

        let mut requesters = HashMap::new();
        requesters.insert(ExecutionKind::Build, HashSet::new());
        requesters.insert(ExecutionKind::Service, HashSet::new());

        Self {
            target_id: target_metadata.id.clone(),
            termination_events,
            target_invalidated_events,
            target_actor_input_receiver,
            target_actor_output_sender,
            to_execute: true,
            executed: false,
            dependencies,
            unavailable_dependencies,
            requesters,
        }
    }

    pub fn should_execute(&self, kind: ExecutionKind) -> bool {
        self.to_execute
            && !self.requesters[&kind].is_empty()
            && self.unavailable_dependencies[&ExecutionKind::Build].is_empty()
            && self.unavailable_dependencies[&ExecutionKind::Service].is_empty()
    }

    pub async fn notify_invalidated(&mut self, kind: ExecutionKind) {
        if !self.to_execute {
            self.to_execute = true;
            self.executed = false;

            let target_id = self.target_id.clone();
            let msg = ActorInputMessage::Invalidated { kind, target_id };
            self.send_to_requesters(kind, msg).await
        }
    }

    pub fn set_execution_started(&mut self) {
        self.to_execute = false;
        self.executed = false;
    }

    pub async fn notify_execution_failed(&mut self, e: Error) {
        self.executed = false;
        let msg = TargetActorOutputMessage::TargetExecutionError(self.target_id.clone(), e);
        let _ = self.target_actor_output_sender.send(msg).await;
    }

    pub async fn send_to_actor(&self, dest: ActorId, msg: ActorInputMessage) {
        let _ = self
            .target_actor_output_sender
            .send(TargetActorOutputMessage::MessageActor { dest, msg })
            .await;
    }

    pub async fn send_to_dependencies(&self, msg: ActorInputMessage) {
        for dependency in &self.dependencies {
            self.send_to_actor(ActorId::Target(dependency.clone()), msg.clone())
                .await
        }
    }

    pub async fn send_to_requesters(&self, kind: ExecutionKind, msg: ActorInputMessage) {
        for requester in &self.requesters[&kind] {
            self.send_to_actor(requester.clone(), msg.clone()).await
        }
    }

    pub async fn notify_success(&mut self, kind: ExecutionKind) {
        self.executed = !self.to_execute;

        if self.executed {
            let target_id = self.target_id.clone();
            let msg = ActorInputMessage::Ok {
                kind,
                target_id,
                actual: true,
            };
            self.send_to_requesters(kind, msg).await
        }
    }

    pub async fn request_dependencies(&self, kind: ExecutionKind) {
        self.send_to_dependencies(ActorInputMessage::Requested {
            kind,
            requester: ActorId::Target(self.target_id.clone()),
        })
        .await;
    }

    pub fn handle_unrequested(&mut self, kind: ExecutionKind, requester: ActorId) -> bool {
        let removed = self.requesters.get_mut(&kind).unwrap().remove(&requester);
        removed && self.requesters[&kind].is_empty()
    }

    pub async fn unrequest_dependencies(&self, kind: ExecutionKind) {
        self.send_to_dependencies(ActorInputMessage::Unrequested {
            kind,
            requester: ActorId::Target(self.target_id.clone()),
        })
        .await;
    }
}