1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
use futures::{Async, Future, Poll}; use actor::{Actor, AsyncContext, Supervised}; use address::{channel, Addr}; use arbiter::Arbiter; use context::Context; use contextimpl::ContextFut; use mailbox::DEFAULT_CAPACITY; use msgs::Execute; /// Actor supervisor /// /// Supervisor manages incoming message for actor. In case of actor failure, /// supervisor creates new execution context and restarts actor lifecycle. /// Supervisor does not does not re-create actor, it just calls `restarting()` /// method. /// /// Supervisor has same lifecycle as actor. In situation when all addresses to /// supervisor get dropped and actor does not execute anything, supervisor /// terminates. /// /// `Supervisor` can not guarantee that actor successfully process incoming /// message. If actor fails during message processing, this message can not be /// recovered. Sender would receive `Err(Cancelled)` error in this situation. /// /// ## Example /// /// ```rust /// # #[macro_use] extern crate actix; /// # use actix::prelude::*; /// #[derive(Message)] /// struct Die; /// /// struct MyActor; /// /// impl Actor for MyActor { /// type Context = Context<Self>; /// } /// /// // To use actor with supervisor actor has to implement `Supervised` trait /// impl actix::Supervised for MyActor { /// fn restarting(&mut self, ctx: &mut Context<MyActor>) { /// println!("restarting"); /// } /// } /// /// impl Handler<Die> for MyActor { /// type Result = (); /// /// fn handle(&mut self, _: Die, ctx: &mut Context<MyActor>) { /// ctx.stop(); /// # System::current().stop(); /// } /// } /// /// fn main() { /// System::run(|| { /// let addr = actix::Supervisor::start(|_| MyActor); /// /// addr.do_send(Die); /// }); /// } /// ``` pub struct Supervisor<A> where A: Supervised + Actor<Context = Context<A>>, { fut: ContextFut<A, Context<A>>, } impl<A> Supervisor<A> where A: Supervised + Actor<Context = Context<A>>, { /// Start new supervised actor in current tokio runtime. /// /// Type of returned address depends on variable type. For example to get /// `Addr<Syn, _>` of newly created actor, use explicitly `Addr<Syn, /// _>` type as type of a variable. /// /// ```rust /// # #[macro_use] extern crate actix; /// # use actix::prelude::*; /// struct MyActor; /// /// impl Actor for MyActor { /// type Context = Context<Self>; /// } /// /// # impl actix::Supervised for MyActor {} /// # fn main() { /// # System::run(|| { /// // Get `Addr` of a MyActor actor /// let addr = actix::Supervisor::start(|_| MyActor); /// # System::current().stop(); /// # });} /// ``` pub fn start<F>(f: F) -> Addr<A> where F: FnOnce(&mut A::Context) -> A + 'static, A: Actor<Context = Context<A>>, { // create actor let mut ctx = Context::new(); let act = f(&mut ctx); let addr = ctx.address(); let fut = ctx.into_future(act); // create supervisor Arbiter::spawn(Supervisor::<A> { fut }); addr } /// Start new supervised actor in arbiter's thread. pub fn start_in_arbiter<F>(sys: &Addr<Arbiter>, f: F) -> Addr<A> where A: Actor<Context = Context<A>>, F: FnOnce(&mut Context<A>) -> A + Send + 'static, { let (tx, rx) = channel::channel(DEFAULT_CAPACITY); sys.do_send(Execute::new(move || -> Result<(), ()> { let mut ctx = Context::with_receiver(rx); let act = f(&mut ctx); let fut = ctx.into_future(act); Arbiter::spawn(Supervisor::<A> { fut }); Ok(()) })); Addr::new(tx) } } #[doc(hidden)] impl<A> Future for Supervisor<A> where A: Supervised + Actor<Context = Context<A>>, { type Item = (); type Error = (); fn poll(&mut self) -> Poll<Self::Item, Self::Error> { loop { match self.fut.poll() { Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(_)) | Err(_) => { // stop if context's address is not connected if !self.fut.restart() { return Ok(Async::Ready(())); } } } } } }