Skip to content

Managed connectiion #14

@HaveFunTrading

Description

@HaveFunTrading
trait ConnectionFactory {
    type Target: Read + Write;
    fn new_connection(&self) -> std::io::Result<Option<Websocket<Self::Target>>>;
}

enum Event<'a, T> {
    Connected(&'a mut Websocket<T>),
    Update(&'a mut Websocket<T>),
    Disconnected,
}

struct ManagedConnection<C: ConnectionFactory> {
    connection_factory: C,
    inner: Option<Websocket<C::Target>>,
    next_connect_time_ns: u64,
    clock: SharedClock,
}

impl<C: ConnectionFactory> ManagedConnection<C> {
    fn new(connection_factory: impl Into<C>, clock: SharedClock) -> Self {
        ManagedConnection {
            connection_factory: connection_factory.into(),
            inner: None,
            next_connect_time_ns: 0,
            clock,
        }
    }

    #[inline]
    const fn inner(&self) -> Option<&Websocket<C::Target>> {
        self.inner.as_ref()
    }

    #[inline]
    const fn inner_mut(&mut self) -> Option<&mut Websocket<C::Target>> {
        self.inner.as_mut()
    }

    #[inline]
    fn poll<F>(&mut self, mut handler: F)
    where
        F: FnMut(Event<C::Target>) -> Result<(), Error>,
    {
        match self.inner.as_mut() {
            None => {
                let now = self.clock.cached_time_nanos();
                if now > self.next_connect_time_ns {
                    match self.connection_factory.new_connection() {
                        Ok(Some(ws)) => {
                            let ws = self.inner.insert(ws);
                            match handler(Event::Connected(ws)) {
                                Ok(_) => {}
                                Err(err) => {
                                    error!("websocket create error, will reconnect in 1s: {}", err);
                                    let _ = self.inner.take();
                                    let _ = handler(Event::Disconnected);
                                }
                            }
                        }
                        Ok(None) => {}
                        Err(err) => {
                            error!("websocket create error, will reconnect in 1s: {}", err);
                        }
                    }
                    self.next_connect_time_ns = now + Duration::from_secs(1).as_nanos() as u64;
                }
            }
            Some(ws) => match handler(Event::Update(ws)) {
                Ok(_) => {}
                Err(err) => {
                    error!("websocket error, will reconnect in 1s: {}", err);
                    let _ = self.inner.take();
                    let _ = handler(Event::Disconnected);
                }
            },
        }
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions